AWS Glue の Dynamicframeを使わずDataframeを用いて自在にCSV/TSVファイルを出力する
はじめに
Dynamicframeは、フォーマットにCSVファイルを指定できますが、空白を含む文字列は自動的にダブルクォーテーションで括られてしまったり、デリミタ文字列はカンマのみとなります。今回はこの課題を解決するため、Dynamicframeを使わずDataframeを用いて自在にCSV/TSVファイル出力する方法をご紹介します。
DynamicframeによるCSVファイル出力の課題
空白を含む文字列は自動的にダブルクォーテーションで括られてしまう
例えば、一般的なISOフォーマットのTIMESTAMP形式2018-04:30 12:34:56
のように空白が含まれている文字列をDynamicframeでCSVファイル出力すると、"2018-04:30 12:34:56"
のように自動的にダブルクォーテーションで括られてしまいます。ダブルクォーテーションで括られたTIMESTAMP形式の文字列は、Amazon Redshift SpectrumやAmazon AthenaでTIMESTAMP型のデータとして自動認識できません。
つまり、Dynamicframeを用いると、データソースのCSVファイルのTIMESTAMP型のデータが、Amazon Redshift SpectrumやAmazon AthenaでTIMESTAMP型のデータとして認識できないCSVファイルフォーマットで出力されてしまいます。
デリミタ文字列はカンマのみ
Dynamicframeの出力オプションは、formatの指定できますが、formatにCSV
を指定するとデリミタ文字列は、カンマのみとなります。つまり、タブ文字など任意の文字によって区切ることはできません。
つまり、Dynamicframeを用いると、ソースデータのTSVファイルからTSVファイルのターゲットデータを生成できません。
Dataframeを用いて自在にCSV/TSVファイル出力する
カラムデータは、yyyy-MM-dd HH:mm:ss
形式のTIMESTAMPの文字列を生成して出力したとします。
Dataframeを用いたCSVファイルをgzip出力する方法
datasink2 = glueContext.write_dynamic_frame.from_options(frame = applymapping1, connection_type = "s3", connection_options = {"path": "s3://mybucket/posdata", "compression": "gzip"}, format = "csv", transformation_ctx = "datasink2")
生成される上記のコードの代わりに、
df_out = applymapping1.toDF() df_out.write.mode("append").option("sep", ",").csv("s3://mybucket/posdata", compression="gzip")
のようにDataframeを用いて出力します。
- 1行目は、toDF()関数を用いて、Dynamicframe から Dataframe に変換しています。変換してしまえば後は、Apache Spark 標準の DataframeのAPIを利用して出力できます
- 2行目は、セパレータ文字列にカンマを指定(
option("sep", ",")
)と、データのGZIP圧縮を指定(compression="gzip"
)して出力します。
Dataframeを用いてCSVファイル出力すると、TIMESTAMP形式2018-04:30 12:34:56
がダブルクォーテーションで括られず、そのまま出力できます。
なお、上記の方法で、TIMESTAMP型をそのまま出力すると、ISO/W3Cフォーマット(例. 2016-04-30T12:34:56.000Z
)で出力されます。
Dataframeを用いたTSVファイルをgzip出力する方法
セパレータ文字列にタブを指定(option("sep", "\t")
)して出力します。それ以外はCSVファイル出力と同様です。
df_out = applymapping1.toDF() df_out.write.mode("append").option("sep", "\t").csv("s3://mybucket/posdata", compression="gzip")
最後に
公式のチュートリアルやドキュメントのサンプルは、CSVファイルをParquetファイル(カラムナファイルフォーマット)に変換する例がほとんどで、CSVからCSVやTSVからTSVに変換する例はありませんでした。そんなときは、Sparkが提供するDataframeでワークアラウンドがないか検討すると良いでしょう。しかし、Spark初心者が安易にDataframeでなんでも済ませようというのはお薦めしません。AWS Glueが提供するDynamicFrameは、とても良くできたフレームワークであり、Sparkの知見がないエンジニアでも容易にETLコードを安全に書くことができますので、DynamicFrameでできることは出来る限り、DynamicFrameを利用することをお薦めします。そして、将来的にはDataFrameを使わず、DynamicFrameのみでETLコードが書けるようになることを期待しています。